/*
 * Copyright 2015 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.stream.binder.gemfire;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;

import com.gemstone.gemfire.cache.EntryOperation;
import com.gemstone.gemfire.cache.PartitionResolver;

import org.springframework.messaging.Message;

/**
 * Key class for storing {@link Message messages} in a GemFire region.
 * This class consists of the following fields:
 * <ul>
 *     <li>A unique sequence id generated by the client; this id
 *         is used to determine the order of messages</li>
 *     <li>A producer id which is a combination of the producer PID
 *         and the timestamp of when the producer process was started.</li>
 *     <li>An integer used as the hash code for GemFire to select a
 *         bucket for storing the message in a partitioned region.</li>
 * </ul>
 *
 * @author Patrick Peralta
 */
public final class MessageKey implements Externalizable, Comparable<MessageKey>,
		PartitionResolver<MessageKey, Message<?>> {

	/**
	 * Unique sequential id for message.
	 */
	private long sequenceId;

	/**
	 * Time stamp of when the producer process started. Marked
	 * {@code transient} because this value is serialized as
	 * part of {@link #producerId}.
	 */
	private transient long producerTimestamp;

	/**
	 * Encoded identifier for producer. First (leftmost) 16 bits are
	 * reserved for the PID of the producer process, rightmost 48
	 * bits are reserved for the timestamp of when the producer
	 * process was started.
	 */
	private long producerId;

	/**
	 * Object returned for {@link #getRoutingObject}.
	 */
	private int routingHash;

	/**
	 * Constructor for serialization.
	 */
	public MessageKey() {
	}

	/**
	 * Construct a {@code MessageKey}.
	 *
	 * @param sequenceId unique sequence id used for ordering messages
	 * @param producerTimestamp time stamp of when the producer process started
	 * @param producerPid PID of producer process
	 */
	public MessageKey(long sequenceId, long producerTimestamp, int producerPid) {
		this.sequenceId = sequenceId;
		this.producerTimestamp = producerTimestamp;
		this.producerId = producerId(producerTimestamp, producerPid);
		this.routingHash = this.hashCode();
	}

	/**
	 * Construct a {@code MessageKey} with a specific hash for selecting the
	 * partitioned region bucket.
	 *
	 * @param sequenceId unique sequence id used for ordering messages
	 * @param producerTimestamp time stamp of when the producer process started
	 * @param producerPid PID of producer process
	 * @param routingHash routing hash for bucket selection
	 */
	public MessageKey(long sequenceId, long producerTimestamp, int producerPid, int routingHash) {
		this.sequenceId = sequenceId;
		this.producerTimestamp = producerTimestamp;
		this.producerId = producerId(producerTimestamp, producerPid);
		this.routingHash = routingHash;
	}

	/**
	 * Return a producer id consisting of the timestamp of the process startup
	 * time for the producer and the PID of the producer.
	 * <p>
	 * The first (leftmost) 16 bits are reserved for the PID of the producer
	 * process, rightmost 48 bits are reserved for the timestamp of when the
	 * producer process was started.
	 *
	 * @param producerTimestamp startup timestamp for producer
	 * @param producerPid PID for producer
	 * @return producer id
	 */
	private long producerId(long producerTimestamp, int producerPid) {
		long longPid = (long) producerPid << 48;
		return longPid ^ producerTimestamp;
	}

	/**
	 * Return the sequence id used for ordering messages.
	 *
	 * @return sequence id
	 */
	public long getSequenceId() {
		return sequenceId;
	}

	/**
	 * Return the producer id for this message key.
	 *
	 * @return producer id
	 */
	public long getProducerId() {
		return producerId;
	}

	/**
	 * Return the PID of the producer for this message key.
	 *
	 * @return producer PID
	 */
	public int getProducerPid() {
		long longPid = producerId >>> 48;
		return (int) longPid;
	}

	/**
	 * Return the timestamp of when the producer of this message key started.
	 *
	 * @return producer startup timestamp
	 */
	public long getProducerTimestamp() {
		return producerTimestamp;
	}

	@Override
	public int compareTo(MessageKey that) {
		int c = Long.compare(this.producerTimestamp, that.producerTimestamp);
		return (c == 0 ? Long.compare(this.sequenceId, that.sequenceId) : c);
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) return true;
		if (o == null || getClass() != o.getClass()) {
			return false;
		}

		MessageKey that = (MessageKey) o;
		return this.sequenceId == that.sequenceId
				&& this.producerId == that.producerId
				&& this.routingHash == that.routingHash;
	}

	@Override
	public int hashCode() {
		int result = (int) (sequenceId ^ (sequenceId >>> 32));
		result = 31 * result + (int) (producerId ^ (producerId >>> 32));
		result = 31 * result + routingHash;
		return result;
	}

	@Override
	public String toString() {
		return "MessageKey{" +
				"sequenceId=" + sequenceId +
				", producerTimestamp=" + producerTimestamp +
				", producerId=" + producerId +
				", routingHash=" + routingHash +
				'}';
	}

	@Override
	public Object getRoutingObject(EntryOperation<MessageKey, Message<?>> entryOperation) {
		return routingHash;
	}

	@Override
	public String getName() {
		return this.getClass().getName();
	}

	@Override
	public void close() {
	}

	@Override
	public void writeExternal(ObjectOutput out) throws IOException {
		out.writeLong(sequenceId);
		out.writeLong(producerId);
		out.writeInt(routingHash);
	}

	@Override
	public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
		sequenceId = in.readLong();
		producerId = in.readLong();
		routingHash = in.readInt();
		producerTimestamp = producerId << 16 >> 16;
	}

}

